package com.youlai.redis.config;

import com.youlai.redis.service.RedisMessageListener;
import com.youlai.redis.service.RedisStreamListener;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.time.Duration;
import java.util.Properties;

@Configuration
public class RedisConfig {

    private final Logger logger = LoggerFactory.getLogger(RedisConfig.class);

    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.port}")
    private Integer redisPort;

    @Value("${spring.redis.password}")
    private String password;


    @Bean
    public LettuceConnectionFactory lettuceConnectionFactory(RedisProperties redisProperties) {
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(redisProperties.getHost());
        configuration.setPort(redisProperties.getPort());
        configuration.setDatabase(redisProperties.getDatabase());
        configuration.setPassword(redisProperties.getPassword());

        return new LettuceConnectionFactory(configuration);
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate<?, ?> template = new RedisTemplate<>();
        template.setConnectionFactory(lettuceConnectionFactory);
        template.setExposeConnection(true);
        template.setEnableTransactionSupport(true);
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        RedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        template.setKeySerializer(stringSerializer);
        template.setValueSerializer(jacksonSerializer);
        return template;
    }

    /**
     * 注册消息订阅者
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisTemplate<String, String> redisTemplate) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate.getConnectionFactory());
        container.addMessageListener(new RedisMessageListener(), new ChannelTopic("pubsub:example"));
        return container;
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
            RedisTemplate<String, Object> redisTemplate) {
        // redis版本应大于5.0
        checkRedisVersion(redisTemplate);

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 读取超时时间
                        .pollTimeout(Duration.ofSeconds(3))
                        // 每次最多拉取多少条消息
                        .batchSize(10)
                        // 目标类型为String
                        .targetType(String.class)
                        .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);

        try {
            // 提前创建 避免报错
            redisTemplate.opsForStream().createGroup("exampleStreamKey", "exampleGroup");
        } catch (Exception e) {
            logger.info("消费者组已存在 不再重复创建");
        }

        // 消费组、streamKey、消费偏移量配置
        container.receiveAutoAck(Consumer.from("exampleGroup", "ExampleConsumer"), StreamOffset.create("exampleStreamKey", ReadOffset.lastConsumed()), new RedisStreamListener());

        return container;
    }

    private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
        Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
        String version = (String) info.get("redis_version");
        int majorVersion = Integer.parseInt(version.substring(0, version.indexOf(".")));
        if (majorVersion < 5) {
            throw new IllegalStateException(String.format("Redis 版本为 %s，小于最低要求的 5.0.0 版本！" + "请重新进行安装。", version));
        }
    }

    @Bean
    public RedissonClient redisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress(String.format("%s%s", "redis://", redisHost+":"+redisPort))
                .setPassword(password)
                .setConnectionPoolSize(64)              // 连接池大小
                .setConnectionMinimumIdleSize(8)        // 保持最小连接数
                .setConnectTimeout(1500)                // 建立连接超时时间
                .setTimeout(2000)                       // 执行命令的超时时间, 从命令发送成功时开始计时
                .setRetryAttempts(2)                    // 命令执行失败重试次数
                .setRetryInterval(1000);                // 命令重试发送时间间隔
        return Redisson.create(config);
    }


    @Bean
    public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
        return redissonClient.getBlockingQueue("TOKEN-RENEWAL");
    }


    @Bean
    public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
                                              RedissonClient redissonClient) {
        return redissonClient.getDelayedQueue(blockingQueue);
    }



}

